# Importing needed libraries and setting up the parameters
import os
import pathlib
import pandas as pd
from glob import glob
from global_land_mask import globe
import plotly.express as px
ROOT_DIR = pathlib.Path(r"C:\Sparkathon") # The path needs to match where the consumer.py is producing the output files.
print("Initialization is completed. The directory where files are expected is: " + str(ROOT_DIR))
Raw incoming driver data from the kafka topic is captured and written to disk by consumer.py Using this data we can get some insights which have been listed below.
# Creating a dataframe from all the csv files available under the raw directory.
PATH_RAW = ROOT_DIR / 'data/raw'
EXT='*.csv'
all_csv_raw_files = [file for path, subdir, files in os.walk(PATH_RAW) for file in glob(os.path.join(path, EXT))]
l_raw = [pd.read_csv(filename, header=None) for filename in all_csv_raw_files]
df_raw = pd.concat(l_raw, axis=0, sort=False)
colnames=['ts_minute','epoch_time','driver_id','on_duty','location']
df_raw.columns = colnames
#print(df_raw.shape)
print("Dataframe from raw data successfully created.")
# Validate the data types of the columns in the dataframe to ensure consistency
df_raw['ts_minute'] = pd.to_datetime(df_raw['ts_minute'])
df_raw['driver_id'] = df_raw['driver_id'].astype(int)
df_raw['on_duty'] = df_raw['on_duty'].astype(int)
#df_raw.head()
print("The datatypes are all valid.")
# Code to identify and plot the driver availability on a per minute basis.
# Raw Driver Group
minute_group_good_records = df_raw[(df_raw['driver_id'] > 0)
& (df_raw['on_duty'].isin([0,1]))].groupby(['ts_minute'
,'on_duty']).agg({'driver_id':'count'}).reset_index()
fig = px.line(minute_group_good_records, x="ts_minute", y="driver_id", color='on_duty',
color_discrete_sequence=["red", "green"])
# Edit the layout
fig.update_layout(title='Near Real Time Driver Availability (per minute)'
,xaxis_title='Minutes'
,yaxis_title='Record Count'
,font_size=16
,legend_title_text='On Duty'
)
fig.show()
# Sample map data preparation
_df_raw = df_raw.sample(500)
# Expand location >> lat, lon columns
_df_raw = _df_raw.join(_df_raw['location'].str.split(' ', 1, expand=True).rename(columns={0:'lat', 1:'lon'})).copy()
# Validate location is_land or not
_df_raw['is_land'] = _df_raw[['lat', 'lon']].apply(lambda x: globe.is_land(float(x[0]), float(x[1])), axis=1)
_df_raw = _df_raw[_df_raw['is_land']==True].copy()
_df_raw['lat'] = _df_raw['lat'].astype(float)
_df_raw['lon'] = _df_raw['lon'].astype(float)
_df_raw['lat_r'] = _df_raw['lat'].astype(float).round(1)
_df_raw['lon_r'] = _df_raw['lon'].astype(float).round(1)
# Derived Columns
## df_raw['location_r'] = df_raw[['lat_r', 'lon_r']].apply(lambda x: str(x[0]) + ' ' + str(x[1]), axis=1)
_df_raw['hour'] = _df_raw.ts_minute.dt.hour
_df_raw['minute'] = _df_raw.ts_minute.dt.minute
# plotting the map with the availability data
fig = px.scatter_mapbox(_df_raw, lat="lat", lon="lon"
,hover_name="driver_id"
,color_discrete_sequence=["fuchsia"]
,zoom=4
,height=300)
fig.update_layout(mapbox_style="open-street-map")
fig.update_layout(margin={"r":0,"t":0,"l":0,"b":0})
# fig.update_layout(title='Driver Geo Spatial View',font_size=16)
fig.show()
This section analyzes the data coming in from the kafka stream on a per minute basis to calculate the below metrics,
# Creating a dataframe from all the csv files available under the audit directory.
PATH_AUDIT = ROOT_DIR / 'Data/audit'
EXT = "*.csv"
COL_NAMES = ['ts_minute', 'received', 'processed', 'deleted']
all_csv_audit_files = [file for path, subdir, files in os.walk(PATH_AUDIT) for file in glob(os.path.join(path, EXT))]
#print(all_csv_audit_files)
l_audit = [pd.read_csv(filename, header=None) for filename in all_csv_audit_files]
df_audit = pd.concat(l_audit, axis=0)
df_audit.columns = COL_NAMES
# df_audit=df_audit[df_audit['total_record_received'] > 10]
# print(df_audit.shape)
print("Dataframe from audit data successfully created.")
# Plot the audit data
fig = px.bar(df_audit, x="ts_minute"
,y=["received","processed"]
,color_discrete_sequence=["yellow", "green"])
# Edit the layout
fig.update_layout(title='Audit Trail (Stacked Bar View)'
,xaxis_title='Minutes'
,yaxis_title='Record Count'
,font_size=16
,legend_title_text='Stream Records'
)
fig.show()
This section analyzes the data to depict the drivers available for hire.
# Creating a dataframe from all the csv files available under the online directory.
PATH_ONLINE = ROOT_DIR / 'Data/online'
EXT = "*.csv"
COL_NAMES = ['ts_minute','online','available']
all_csv_online_files = [file for path, subdir, files in os.walk(PATH_ONLINE) for file in glob(os.path.join(path, EXT))]
l_online = [pd.read_csv(filename, header=None) for filename in all_csv_online_files]
df_online = pd.concat(l_online, axis=0)
df_online.columns = COL_NAMES
df_online = df_online[df_online['available'] >=0]
#print(df_online.shape)
print("Dataframe from online data successfully created.")
# Plot the online vs available driver view
fig = px.bar(df_online, x="ts_minute"
,y=["online","available"]
,color_discrete_sequence=["yellow", "green"]
)
# Edit the layout
fig.update_layout(title='Online Driver Availability'
,xaxis_title='Minutes'
,yaxis_title='Record Count'
,font_size=16
,legend_title_text='Driver Availability'
)
fig.show()